/**********************************************************************
 *	Filename: core_mqtt.c
 *
 *	Copyright:(C)ZhouYanlin<www.zhouyanlin1222@qq.com>
 *
 *	Description: This file is MQTT client API based on Paho MQTT Embedded
 *
 *  Created on: 2021年10月22日
 *      Author: ZhouYanlin
 *
 **********************************************************************/

#include <stdio.h>
#include <string.h>

#include "stm32l4xx_hal.h"
#include "core_mqtt.h"

#define  MQTT_DBG_PRINT
#ifdef 	 MQTT_DBG_PRINT
#define  mqtt_printf(format,args...) printf(format, ##args)
#else
#define  mqtt_printf(format,args...) do{} while(0)
#endif

int mqtt_connect(char *host, int port, char *clientid, char *username, char *passwd)
{
    MQTTPacket_connectData          data = MQTTPacket_connectData_initializer;
    int                             rv;
    unsigned char                   buf[256];
    unsigned char                   sessionPresent;
    unsigned char                   connack_rc;

    if( !host || port<=0 || !clientid )
    {
    	mqtt_printf("ERROR: Invalid input arguments\r\n");
    	return -1;
    }

	if( (rv=transport_open(host, port)) < 0 )
	{
		mqtt_printf("socket connect [%s:%d] failure, rv=%d\r\n", host, port, rv);
		return rv;
	}
	mqtt_printf("socket connect [%s:%d] ok\r\n", host, port);

    data.clientID.cstring = clientid;
    data.keepAliveInterval = MQTT_KEEP_ALIVE_TIMEOUT_SECONDS;
    data.cleansession = 1;

    if( username && passwd )
    {
    	data.username.cstring = username;
    	data.password.cstring = passwd;
    }

    /* 将连接参数打包成MQTT传输格式 */
    rv=MQTTSerialize_connect(buf, sizeof(buf), &data);
    if( rv < 0 )
    {
        mqtt_printf("MQTTSerialize_connect failure, rv=%d\n", rv);
        return -1;
    }

    if ( rv != transport_sendPacketBuffer(buf,rv) )
    {
        mqtt_printf("transport_sendPacketBuffer for mqtt_connect failure, rv=%d\n", rv);
        return -2;
    }

    HAL_Delay(800);

    /* 接收broker的数据并进行分析 */
    memset(buf, 0, sizeof(buf));
    rv = MQTTPacket_read(buf, sizeof(buf), transport_getdata);
    if( CONNACK != rv )
    {
        mqtt_printf("MQTTPacket_read for MQTT CONNACK failure, rv=%d\n", rv);
        return -3;
    }

    /*解析已经分析的CONNACK报文并进行对比（connack_rc连接返回码：0x00表示成功）*/
    if( (rv=MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, sizeof(buf))) != 1 || connack_rc!=0)
    {
        mqtt_printf("MQTTDeserialize_connack failure, rv=%d\n", rv);
        return -4;
    }

    return 0;
}


int mqtt_disconnect(void)
{
    int                             rv;
    unsigned char                   buf[256];


    /*将断开连接的数据包序列化到提供的缓冲区，以便写入到套接字*/
    rv = MQTTSerialize_disconnect(buf, sizeof(buf));
    if( rv < 0 )
    {
        mqtt_printf("MQTTSerialize_disconnect failure, rv=%d\n", rv);
        return -1;
    }

    if ( rv != transport_sendPacketBuffer(buf,rv) )
    {
        mqtt_printf("transport_sendPacketBuffer for mqtt_disconnect failure, rv=%d\n", rv);
        return -2;
    }

    return 0;
}

int mqtt_subscribe_topic(char *topic, int qos, int msgid)
{
    MQTTString                      topicString = MQTTString_initializer;
    unsigned short                  submsgid;
    int                             subcount, granted_qos;
    int                             rv;
    unsigned char                   buf[256];

    topicString.cstring = topic;

    /*将提供的订阅数据序列化到提供的缓冲区中，以便发送*/
    rv = MQTTSerialize_subscribe(buf, sizeof(buf), 0, msgid, 1, &topicString, &qos);
    if( rv < 0 )
    {
        mqtt_printf("MQTTSerialize_subscribe failure, rv=%d\n", rv);
        return -1;
    }

    if ( rv != transport_sendPacketBuffer(buf,rv) )
    {
        mqtt_printf("transport_sendPacketBuffer for mqtt_subscribe_topic failure, rv=%d\n", rv);
        return -2;
    }

    HAL_Delay(800);

   /* 接收并解析数据*/
    memset(buf, 0, sizeof(buf));
    rv = MQTTPacket_read(buf, sizeof(buf), transport_getdata);
    if( SUBACK != rv)
    {
        mqtt_printf("MQTTPacket_read for MQTT SUBACK failure, rv=%d\n", rv);
        return -3;
    }

    /*反向解析收到的数据*/
    rv = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, sizeof(buf));
    if( !rv || submsgid!=msgid || granted_qos==0x80)
    {
        mqtt_printf("MQTTDeserialize_suback failure, rv=%d\n", rv);
        return -4;
    }

    return 0;
}

int mqtt_unsubscribe_topic(char *topic, int msgid)
{
    MQTTString                      topicString = MQTTString_initializer;
    unsigned short                  submsgid;
    int                             rv;
    unsigned char                   buf[256];

    topicString.cstring = topic;

    rv = MQTTSerialize_unsubscribe(buf, sizeof(buf), 0, msgid, 1, &topicString);
    if( rv < 0 )
    {
        mqtt_printf("MQTTSerialize_subscribe failure, rv=%d\n", rv);
        return -1;
    }

    if ( rv != transport_sendPacketBuffer(buf,rv) )
    {
        mqtt_printf("transport_sendPacketBuffer for mqtt_unsubscribe_topic failure, rv=%d\n", rv);
        return -2;
    }

    HAL_Delay(800);

    memset(buf, 0, sizeof(buf));
    rv = MQTTPacket_read(buf, sizeof(buf), transport_getdata);
    if( UNSUBACK != rv)
    {
        mqtt_printf("MQTTPacket_read for MQTT UNSUBACK failure, rv=%d\n", rv);
        return -3;
    }

    rv = MQTTDeserialize_unsuback(&submsgid, buf, sizeof(buf));
    if( !rv || submsgid!=msgid )
    {
        mqtt_printf("MQTTDeserialize_unsuback failure, rv=%d\n", rv);
        return -4;
    }

    return 0;
}


int mqtt_publish(char *topic, int qos, char *payload)
{
    MQTTString                      topicString = MQTTString_initializer;
    int                             rv;
    unsigned char                   buf[256];
    unsigned char                   dup = 0;
    unsigned char                   retained = 0;
    unsigned short                  packetid = 0;

    topicString.cstring = topic;

    rv = MQTTSerialize_publish(buf, sizeof(buf), dup, qos, retained, packetid, topicString, (unsigned char*)payload, strlen(payload));
    if( rv < 0 )
    {
        mqtt_printf("MQTTSerialize_publish failure, rv=%d\n", rv);
        return -1;
    }

    if ( rv != transport_sendPacketBuffer(buf,rv) )
    {
        mqtt_printf("transport_sendPacketBuffer for MQTTSerialize_publish failure, rv=%d\n", rv);
        return -2;
    }

    HAL_Delay(800);

    memset(buf, 0, sizeof(buf));
    rv = MQTTPacket_read(buf, sizeof(buf), transport_getdata);
    if( PUBLISH!=rv && -1!=rv )
    {
        mqtt_printf("MQTTPacket_read for MQTT PUBLISH failure, rv=%d\n", rv);
        return -3;
    }

    return 0;
}


int mqtt_pingreq(void)
{
    int                             rv;
    unsigned char                   buf[256];

    rv = MQTTSerialize_pingreq(buf, sizeof(buf));
    if( rv < 0 )
    {
        mqtt_printf("MQTTSerialize_pingreq failure, rv=%d\n", rv);
        return -1;
    }

    if ( rv != transport_sendPacketBuffer(buf,rv) )
    {
        mqtt_printf("transport_sendPacketBuffer for MQTTSerialize_pingreq failure, rv=%d\n", rv);
        return -2;
    }

    HAL_Delay(800);

    memset(buf, 0, sizeof(buf));
    rv = MQTTPacket_read(buf, sizeof(buf), transport_getdata);
    if( PINGRESP != rv)
    {
        mqtt_printf("MQTTPacket_read for MQTT PINGRESP failure, rv=%d\n", rv);
        return -3;
    }

    return 0;
}


